# Persist logs to Elasticsearch.

input {
    kafka {
        bootstrap_servers => "{{ monasca_kafka_servers }}"
        topics => ["{{ monasca_raw_logs_topic }}"]
        group_id => "log_persister"
        consumer_threads => "{{ monasca_log_pipeline_threads }}"
        codec => json
    }
}

filter {
    # Update the timestamp of the event based on the time in the message.
    date {
        match => [ "[log][dimensions][timestamp]", "yyyy-MM-dd HH:mm:ss Z", "ISO8601"]
        remove_field => [ "[log][dimensions][timestamp]", "[log][dimensions][Timestamp]" ]
    }

    # Monasca Log API adds a timestamp when it processes a log entry. This
    # timestamp needs to be converted from seconds since the epoch for
    # Elasticsearch to parse it correctly. Here we make that conversion.
    date {
        match => ["creation_time", "UNIX"]
        target => "creation_time"
    }

    # OpenStack log levels are uppercase, and syslog are lowercase.
    # Furthermore, syslog has more log levels that OpenStack. To avoid
    # mapping syslog log levels to OpenStack log levels, we standardise
    # on the syslog style here.
    if [log][dimensions][log_level] {
        mutate {
            lowercase => [ "[log][dimensions][log_level]" ]
        }
    }
}

output {
    elasticsearch {
        index => "monasca-%{[meta][tenantId]}-%{+YYYY.MM.dd}"
        hosts => [{{ monasca_elasticsearch_servers }}]
        document_type => "log"
        template_name => "monasca"
        template => "/etc/logstash/elasticsearch-template.json"
    }
}
